package com.kilomob.grady.client;

import io.netty.channel.Channel;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.kilomob.grady.exception.GradyException;
import com.kilomob.grady.model.Request;
import com.kilomob.grady.model.Response;

public class DefaultFuture  {

    private static Logger LOGGER = LoggerFactory.getLogger(DefaultFuture.class);

    private static final Map<String, Channel>       CHANNELS   = new ConcurrentHashMap<String, Channel>();

    private static final Map<String, DefaultFuture> FUTURES   = new ConcurrentHashMap<String, DefaultFuture>();

    // invoke id.
    private final String                            id;

    private final Channel                         channel;
    
    private final Request                         request;

    private final int                             timeout;

    private final Lock                            lock = new ReentrantLock();

    private final Condition                       done = lock.newCondition();

    private final long                            start = System.currentTimeMillis();

    private volatile long                         sent;
    
    private volatile Response                     response;
    

    public DefaultFuture(Channel channel, Request request, int timeout){
        this.channel = channel;
        this.request = request;
        this.id = request.getMessageId();
        this.timeout = timeout > 0 ? timeout : 10000;
        // put into waiting map.
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }
    
    public Response get() throws GradyException {
        return get(timeout);
    }

    public Response get(int timeout) throws GradyException {
        if (! isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (! isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
        }
        return response;
    }
    

    public boolean isDone() {
        return response != null;
    }
    
    private String getId() {
        return id;
    }
    
    private Channel getChannel() {
        return channel;
    }
    
    private boolean isSent() {
        return sent > 0;
    }

    public Request getRequest() {
        return request;
    }

    private int getTimeout() {
        return timeout;
    }

    private long getStartTimestamp() {
        return start;
    }

    public static DefaultFuture getFuture(long id) {
        return FUTURES.get(id);
    }

    public static boolean hasFuture(Channel channel) {
        return CHANNELS.containsValue(channel);
    }

    public static void sent(Channel channel, Request request) {
        DefaultFuture future = FUTURES.get(request.getMessageId());
        if (future != null) {
            future.doSent();
        }
    }

    private void doSent() {
        sent = System.currentTimeMillis();
    }

    public static void received(Channel channel, Response response) {
        try {
            DefaultFuture future = FUTURES.remove(response.getMessageId());
            if (future != null) {
                future.doReceived(response);
            } else {
                LOGGER.warn("The timeout response finally returned at " 
                            + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) 
                            + ", response " + response);
            }
        } finally {
            CHANNELS.remove(response.getMessageId());
        }
    }

    private void doReceived(Response res) {
        lock.lock();
        try {
            response = res;
            if (done != null) {
                done.signal();
            }
        } finally {
            lock.unlock();
        }
    }


    private static class RemotingInvocationTimeoutScan implements Runnable {

        public void run() {
            /*while (true) {
                try {
                    for (DefaultFuture future : FUTURES.values()) {
                        if (future == null || future.isDone()) {
                            continue;
                        }
                        if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
                            // create exception response.
                            Response timeoutResponse = new Response(future.getId());
                            // set timeout status.
//                            timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
                            timeoutResponse.setError("time out");
                            // handle response.
                            DefaultFuture.received(future.getChannel(), timeoutResponse);
                        }
                    }
                    Thread.sleep(30);
                } catch (Throwable e) {
                    LOGGER.error("Exception when scan the timeout invocation of remoting.", e);
                }
            }*/
        }
    }

    static {
        Thread th = new Thread(new RemotingInvocationTimeoutScan(), "GradyResponseTimeoutScanTimer");
        th.setDaemon(true);
        th.start();
    }

}